In [1]:
# Initialize PySpark
APP_NAME = "Debugging Prediction Problems"
# If there is no SparkSession, create the environment
try:
sc and spark
except NameError as e:
import findspark
findspark.init()
import pyspark
import pyspark.sql
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
print("PySpark initiated...")
In [2]:
# Load the text file using the SparkContext
csv_lines = sc.textFile("../data/example.csv")
# Map the data to split the lines into a list
data = csv_lines.map(lambda line: line.split(","))
# Collect the dataset into local RAM
data.collect()
Out[2]:
In [3]:
# Turn the CSV lines into objects
def csv_to_record(line):
parts = line.split(",")
record = {
"name": parts[0],
"company": parts[1],
"title": parts[2]
}
return record
# Apply the function to every record
records = csv_lines.map(csv_to_record)
# Inspect the first item in the dataset
records.first()
Out[3]:
In [4]:
# Group the records by the name of the person
grouped_records = records.groupBy(lambda x: x["name"])
# Show the first group
grouped_records.first()
# Count the groups
job_counts = grouped_records.map(
lambda x: {
"name": x[0],
"job_count": len(x[1])
}
)
job_counts.first()
job_counts.collect()
Out[4]:
In [5]:
# Compute a relation of words by line
words_by_line = csv_lines\
.map(lambda line: line.split(","))
print(words_by_line.collect())
# Compute a relation of words
flattened_words = csv_lines\
.map(lambda line: line.split(","))\
.flatMap(lambda x: x)
flattened_words.collect()
Out[5]:
In [6]:
from pyspark.sql import Row
# Convert the CSV into a pyspark.sql.Row
def csv_to_row(line):
parts = line.split(",")
row = Row(
name=parts[0],
company=parts[1],
title=parts[2]
)
return row
# Apply the function to get rows in an RDD
rows = csv_lines.map(csv_to_row)
In [7]:
# Convert to a pyspark.sql.DataFrame
rows_df = rows.toDF()
# Register the DataFrame for Spark SQL
rows_df.registerTempTable("executives")
# Generate a new DataFrame with SQL using the SparkSession
job_counts = spark.sql("""
SELECT
name,
COUNT(*) AS total
FROM executives
GROUP BY name
""")
job_counts.show()
# Go back to an RDD
job_counts.rdd.collect()
Out[7]:
In [8]:
# Load the parquet file containing flight delay records
on_time_dataframe = spark.read.parquet('../data/on_time_performance.parquet')
# Register the data for Spark SQL
on_time_dataframe.registerTempTable("on_time_performance")
# Check out the columns
on_time_dataframe.columns
# Check out some data
on_time_dataframe\
.select("FlightDate", "TailNum", "Origin", "Dest", "Carrier", "DepDelay", "ArrDelay")\
.show()
# Trim the fields and keep the result
trimmed_on_time = on_time_dataframe\
.select(
"FlightDate",
"TailNum",
"Origin",
"Dest",
"Carrier",
"DepDelay",
"ArrDelay"
)
# Sample 0.01% of the data and show
trimmed_on_time.sample(False, 0.0001).show()
In [10]:
# Compute a histogram of departure delays
on_time_dataframe\
.select("DepDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram(10)
Out[10]:
In [14]:
%matplotlib inline
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
# Function to plot a histogram using pyplot
def create_hist(rdd_histogram_data):
"""Given an RDD.histogram, plot a pyplot histogram"""
heights = np.array(rdd_histogram_data[1])
full_bins = rdd_histogram_data[0]
mid_point_bins = full_bins[:-1]
widths = [abs(i - j) for i, j in zip(full_bins[:-1], full_bins[1:])]
bar = plt.bar(mid_point_bins, heights, width=widths, color='b')
return bar
# Compute a histogram of departure delays
departure_delay_histogram = on_time_dataframe\
.select("DepDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([-60,-30,-15,-10,-5,0,5,10,15,30,60,90,120,180])
create_hist(departure_delay_histogram)
Out[14]:
In [12]:
# Dump the unneeded fields
tail_numbers = on_time_dataframe.rdd.map(lambda x: x.TailNum)
tail_numbers = tail_numbers.filter(lambda x: x != '')
# distinct() gets us unique tail numbers
unique_tail_numbers = tail_numbers.distinct()
# now we need a count() of unique tail numbers
airplane_count = unique_tail_numbers.count()
print("Total airplanes: {}".format(airplane_count))
In [15]:
# Use SQL to look at the total flights by month across 2015
on_time_dataframe.registerTempTable("on_time_dataframe")
total_flights_by_month = spark.sql(
"""SELECT Month, Year, COUNT(*) AS total_flights
FROM on_time_dataframe
GROUP BY Year, Month
ORDER BY Year, Month"""
)
# This map/asDict trick makes the rows print a little prettier. It is optional.
flights_chart_data = total_flights_by_month.rdd.map(lambda row: row.asDict())
flights_chart_data.collect()
Out[15]:
In [16]:
# Filter down to the fields we need to identify and link to a flight
flights = on_time_dataframe.rdd.map(lambda x:
(x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum)
)
# Group flights by tail number, sorted by date, then flight number, then origin/dest
flights_per_airplane = flights\
.map(lambda nameTuple: (nameTuple[5], [nameTuple[0:5]]))\
.reduceByKey(lambda a, b: a + b)\
.map(lambda tuple:
{
'TailNum': tuple[0],
'Flights': sorted(tuple[1], key=lambda x: (x[1], x[2], x[3], x[4]))
}
)
flights_per_airplane.first()
Out[16]:
In [ ]:
total_flights = on_time_dataframe.count()
# Flights that were late leaving...
late_departures = on_time_dataframe.filter(
on_time_dataframe.DepDelayMinutes > 0
)
total_late_departures = late_departures.count()
print(total_late_departures)
# Flights that were late arriving...
late_arrivals = on_time_dataframe.filter(
on_time_dataframe.ArrDelayMinutes > 0
)
total_late_arrivals = late_arrivals.count()
print(total_late_arrivals)
# Get the percentage of flights that are late, rounded to 1 decimal place
pct_late = round((total_late_arrivals / (total_flights * 1.0)) * 100, 1)
In [ ]:
# Flights that left late but made up time to arrive on time...
on_time_heros = on_time_dataframe.filter(
(on_time_dataframe.DepDelayMinutes > 0)
&
(on_time_dataframe.ArrDelayMinutes <= 0)
)
total_on_time_heros = on_time_heros.count()
print(total_on_time_heros)
In [20]:
print("Total flights: {:,}".format(total_flights))
print("Late departures: {:,}".format(total_late_departures))
print("Late arrivals: {:,}".format(total_late_arrivals))
print("Recoveries: {:,}".format(total_on_time_heros))
print("Percentage Late: {}%".format(pct_late))
In [21]:
# Get the average minutes late departing and arriving
spark.sql("""
SELECT
ROUND(AVG(DepDelay),1) AS AvgDepDelay,
ROUND(AVG(ArrDelay),1) AS AvgArrDelay
FROM on_time_performance
"""
).show()
In [22]:
# Why are flights late? Lets look at some delayed flights and the delay causes
late_flights = spark.sql("""
SELECT
ArrDelayMinutes,
WeatherDelay,
CarrierDelay,
NASDelay,
SecurityDelay,
LateAircraftDelay
FROM
on_time_performance
WHERE
WeatherDelay IS NOT NULL
OR
CarrierDelay IS NOT NULL
OR
NASDelay IS NOT NULL
OR
SecurityDelay IS NOT NULL
OR
LateAircraftDelay IS NOT NULL
ORDER BY
FlightDate
""")
late_flights.sample(False, 0.01).show()
In [23]:
# Calculate the percentage contribution to delay for each source
total_delays = spark.sql("""
SELECT
ROUND(SUM(WeatherDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_weather_delay,
ROUND(SUM(CarrierDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_carrier_delay,
ROUND(SUM(NASDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_nas_delay,
ROUND(SUM(SecurityDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_security_delay,
ROUND(SUM(LateAircraftDelay)/SUM(ArrDelayMinutes) * 100, 1) AS pct_late_aircraft_delay
FROM on_time_performance
""")
total_delays.show()
In [26]:
# Eyeball the first to define our buckets
weather_delay_histogram = on_time_dataframe\
.select("WeatherDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([1, 5, 10, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)
In [25]:
create_hist(weather_delay_histogram)
Out[25]:
In [ ]:
# Transform the data into something easily consumed by d3
def histogram_to_publishable(histogram):
record = {'key': 1, 'data': []}
for label, value in zip(histogram[0], histogram[1]):
record['data'].append(
{
'label': label,
'value': value
}
)
return record
# Recompute the weather histogram with a filter for on-time flights
weather_delay_histogram = on_time_dataframe\
.filter(
(on_time_dataframe.WeatherDelay != None)
&
(on_time_dataframe.WeatherDelay > 0)
)\
.select("WeatherDelay")\
.rdd\
.flatMap(lambda x: x)\
.histogram([0, 15, 30, 60, 120, 240, 480, 720, 24*60.0])
print(weather_delay_histogram)
record = histogram_to_publishable(weather_delay_histogram)
record
In [31]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf
schema = StructType([
StructField("ArrDelay", DoubleType(), True), # "ArrDelay":5.0
StructField("CRSArrTime", TimestampType(), True), # "CRSArrTime":"2015-12-31T03:20:00.000-08:00"
StructField("CRSDepTime", TimestampType(), True), # "CRSDepTime":"2015-12-31T03:05:00.000-08:00"
StructField("Carrier", StringType(), True), # "Carrier":"WN"
StructField("DayOfMonth", IntegerType(), True), # "DayOfMonth":31
StructField("DayOfWeek", IntegerType(), True), # "DayOfWeek":4
StructField("DayOfYear", IntegerType(), True), # "DayOfYear":365
StructField("DepDelay", DoubleType(), True), # "DepDelay":14.0
StructField("Dest", StringType(), True), # "Dest":"SAN"
StructField("Distance", DoubleType(), True), # "Distance":368.0
StructField("FlightDate", DateType(), True), # "FlightDate":"2015-12-30T16:00:00.000-08:00"
StructField("FlightNum", StringType(), True), # "FlightNum":"6109"
StructField("Origin", StringType(), True), # "Origin":"TUS"
])
features = spark.read.json(
"../data/simple_flight_delay_features.jsonl.bz2",
schema=schema
)
features.first()
Out[31]:
In [32]:
#
# Check for nulls in features before using Spark ML
#
null_counts = [(column, features.where(features[column].isNull()).count()) for column in features.columns]
cols_with_nulls = filter(lambda x: x[1] > 0, null_counts)
print(list(cols_with_nulls))
In [33]:
#
# Add a Route variable to replace FlightNum
#
from pyspark.sql.functions import lit, concat
features_with_route = features.withColumn(
'Route',
concat(
features.Origin,
lit('-'),
features.Dest
)
)
features_with_route.select("Origin", "Dest", "Route").show(5)
In [34]:
#
# Use pysmark.ml.feature.Bucketizer to bucketize ArrDelay
#
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -15.0, 0, 30.0, float("inf")]
bucketizer = Bucketizer(
splits=splits,
inputCol="ArrDelay",
outputCol="ArrDelayBucket"
)
ml_bucketized_features = bucketizer.transform(features_with_route)
# Check the buckets out
ml_bucketized_features.select("ArrDelay", "ArrDelayBucket").show()
In [35]:
#
# Extract features tools in with pyspark.ml.feature
#
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Turn category fields into categoric feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "Route"]:
string_indexer = StringIndexer(
inputCol=column,
outputCol=column + "_index"
)
ml_bucketized_features = string_indexer.fit(ml_bucketized_features)\
.transform(ml_bucketized_features)
# Check out the indexes
ml_bucketized_features.show(6)
In [36]:
# Handle continuous, numeric fields by combining them into one feature vector
numeric_columns = ["DepDelay", "Distance"]
index_columns = ["Carrier_index", "DayOfMonth_index",
"DayOfWeek_index", "DayOfYear_index", "Origin_index",
"Origin_index", "Dest_index", "Route_index"]
vector_assembler = VectorAssembler(
inputCols=numeric_columns + index_columns,
outputCol="Features_vec"
)
final_vectorized_features = vector_assembler.transform(ml_bucketized_features)
# Drop the index columns
for column in index_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Check out the features
final_vectorized_features.show()
In [37]:
#
# Cross validate, train and evaluate classifier
#
# Test/train split
training_data, test_data = final_vectorized_features.randomSplit([0.7, 0.3])
# Instantiate and fit random forest classifier
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(
featuresCol="Features_vec",
labelCol="ArrDelayBucket",
maxBins=4657,
maxMemoryInMB=1024
)
model = rfc.fit(training_data)
# Evaluate model using test data
predictions = model.transform(test_data)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="ArrDelayBucket", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = {}".format(accuracy))
# Check a sample
predictions.sample(False, 0.001, 18).orderBy("CRSDepTime").show(6)
In [ ]: